// Select everybody, but increment the age by 1 df.select(col("name"), col("age").plus(1)).show(); // +-------+---------+ // | name|(age + 1)| // +-------+---------+ // |Michael| null| // | Andy| 31| // | Justin| 20| // +-------+---------+
// Select people older than 21 df.filter(col("age").gt(21)).show(); // +---+----+ // |age|name| // +---+----+ // | 30|Andy| // +---+----+
// Count people by age df.groupBy("age").count().show(); // +----+-----+ // | age|count| // +----+-----+ // | 19| 1| // |null| 1| // | 30| 1| // +----+-----+
Python 版:
df.age 和 df['age'] 均可,建议使用后者防止与属性混淆
# spark, df are from the previous example # Print the schema in a tree format df.printSchema() # root # |-- age: long (nullable = true) # |-- name: string (nullable = true)
// Create an instance of a Bean class Personperson=newPerson(); person.setName("Andy"); person.setAge(32);
// Encoders are created for Java beans Encoder<Person> personEncoder = Encoders.bean(Person.class); Dataset<Person> javaBeanDS = spark.createDataset( Collections.singletonList(person), personEncoder ); javaBeanDS.show(); // +---+----+ // |age|name| // +---+----+ // | 32|Andy| // +---+----+
// Encoders for most common types are provided in class Encoders Encoder<Long> longEncoder = Encoders.LONG(); Dataset<Long> primitiveDS = spark.createDataset(Arrays.asList(1L, 2L, 3L), longEncoder); Dataset<Long> transformedDS = primitiveDS.map( (MapFunction<Long, Long>) value -> value + 1L, longEncoder); transformedDS.collect(); // Returns [2, 3, 4]
// DataFrames can be converted to a Dataset by providing a class. Mapping based on name Stringpath="examples/src/main/resources/people.json"; Dataset<Person> peopleDS = spark.read().json(path).as(personEncoder); peopleDS.show(); // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+
// Create an RDD of Person objects from a text file JavaRDD<Person> peopleRDD = spark.read() .textFile("examples/src/main/resources/people.txt") .javaRDD() .map(line -> { String[] parts = line.split(","); Personperson=newPerson(); person.setName(parts[0]); person.setAge(Integer.parseInt(parts[1].trim())); return person; });
// Apply a schema to an RDD of JavaBeans to get a DataFrame Dataset<Row> peopleDF = spark.createDataFrame(peopleRDD, Person.class); // Register the DataFrame as a temporary view peopleDF.createOrReplaceTempView("people");
// SQL statements can be run by using the sql methods provided by spark Dataset<Row> teenagersDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");
// The columns of a row in the result can be accessed by field index Encoder<String> stringEncoder = Encoders.STRING(); Dataset<String> teenagerNamesByIndexDF = teenagersDF.map( (MapFunction<Row, String>) row -> "Name: " + row.getString(0), stringEncoder); teenagerNamesByIndexDF.show(); // +------------+ // | value| // +------------+ // |Name: Justin| // +------------+
// or by field name Dataset<String> teenagerNamesByFieldDF = teenagersDF.map( (MapFunction<Row, String>) row -> "Name: " + row.<String>getAs("name"), stringEncoder); teenagerNamesByFieldDF.show(); // +------------+ // | value| // +------------+ // |Name: Justin| // +------------+
Python 版:
Spark SQL 可以将一个 Row 对象的 RDD 转换为数据帧
Row 对象通过向 Row 类传入一个键值对列表(作为 kwargs)构建
列表的键定义了表的列名,类型基于数据集采样推理得出
from pyspark.sql import Row
sc = spark.sparkContext
# Load a text file and convert each line to a Row. lines = sc.textFile("examples/src/main/resources/people.txt") parts = lines.map(lambda l: l.split(",")) people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))
# Infer the schema, and register the DataFrame as a table. schemaPeople = spark.createDataFrame(people) schemaPeople.createOrReplaceTempView("people")
# SQL can be run over DataFrames that have been registered as a table. teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
# The results of SQL queries are Dataframe objects. # rdd returns the content as an :class:`pyspark.RDD` of :class:`Row`. teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect() for name in teenNames: print(name) # Name: Justin
// The schema is encoded in a string StringschemaString="name age";
// Generate the schema based on the string of schema List<StructField> fields = newArrayList<>(); for (String fieldName : schemaString.split(" ")) { StructFieldfield= DataTypes.createStructField(fieldName, DataTypes.StringType, true); fields.add(field); } StructTypeschema= DataTypes.createStructType(fields);
// Convert records of the RDD (people) to Rows JavaRDD<Row> rowRDD = peopleRDD.map((Function<String, Row>) record -> { String[] attributes = record.split(","); return RowFactory.create(attributes[0], attributes[1].trim()); });
// Apply the schema to the RDD Dataset<Row> peopleDataFrame = spark.createDataFrame(rowRDD, schema);
// Creates a temporary view using the DataFrame peopleDataFrame.createOrReplaceTempView("people");
// SQL can be run over a temporary view created using DataFrames Dataset<Row> results = spark.sql("SELECT name FROM people");
// The results of SQL queries are DataFrames and support all the normal RDD operations // The columns of a row in the result can be accessed by field index or by field name Dataset<String> namesDS = results.map( (MapFunction<Row, String>) row -> "Name: " + row.getString(0), Encoders.STRING()); namesDS.show(); // +-------------+ // | value| // +-------------+ // |Name: Michael| // | Name: Andy| // | Name: Justin| // +-------------+
# Import data types from pyspark.sql.types import StringType, StructType, StructField
sc = spark.sparkContext
# Load a text file and convert each line to a Row. lines = sc.textFile("examples/src/main/resources/people.txt") parts = lines.map(lambda l: l.split(",")) # Each line is converted to a tuple. people = parts.map(lambda p: (p[0], p[1].strip()))
# The schema is encoded in a string. schemaString = "name age"
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()] schema = StructType(fields)
# Apply the schema to the RDD. schemaPeople = spark.createDataFrame(people, schema)
# Creates a temporary view using the DataFrame schemaPeople.createOrReplaceTempView("people")
# SQL can be run over DataFrames that have been registered as a table. results = spark.sql("SELECT name FROM people")